package com.wulaobo.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author wuwenbo
 * @Date 2024/5/16 15:51
 * @ClassName: KafkaSourceDemo
 * @Description: TODO
 */
public class KafkaSourceDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> build = KafkaSource.<String>builder()
                .setBootstrapServers("111.229.68.187:19094")
                .setTopics("test")
                .setGroupId("test")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> kafkaSource = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka source");


        kafkaSource.print();

        env.execute();
    }

}
